Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44872][CONNECT][FOLLOWUP] Deflake ReattachableExecuteSuite and increase retry buffer #42908

Closed

Conversation

juliuszsompolski
Copy link
Contributor

What changes were proposed in this pull request?

Deflake tests in ReattachableExecuteSuite and increase CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE.

Why are the changes needed?

Two tests could be flaky with errors INVALID_CURSOR.POSITION_NOT_AVAILABLE.
This is caused when a server releases the response when it falls more than CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE behind the latest response it sent. However, because of HTTP2 flow control, the responses could still be in transit. In the test suite, we were explicitly disconnecting the iterators and later reconnect... In some cases they could not reconnect, because the response they last seen have fallen too fare behind.

This not only changes the suite, but also adjust the default config. This potentially makes the reconnecting more robust. In normal situation, it should not lead to increased memory pressure, because the clients also release the responses using ReleaseExecute as soon as they are received. Normally, buffered responses should be freed by ReleaseExecute and this retry buffer is only a fallback mechanism. Therefore, it is safe to increase the default.

In practice, this would only have effect in cases where there are actual network errors, and the increased buffer size should make the reconnects more robust in these cases.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

ReattachableExecuteSuite.
Did more manual experiments of how far the response sent by client can be behind the response sent by server (because of HTTP2 flow control window)

Was this patch authored or co-authored using generative AI tooling?

No.

@juliuszsompolski
Copy link
Contributor Author

cc @hvanhovell @dongjoon-hyun

@@ -139,7 +139,7 @@ object Connect {
"With any value greater than 0, the last sent response will always be buffered.")
.version("3.5.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")
.createWithDefaultString("10m")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the purpose is to Deflake ReattachableExecuteSuite and increase retry buffer, shall we increase this only at ReattachableExecuteSuite instead of touching the default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained in the PR description that I think that increasing this default is a genuine improvement that will help make reconnects more robust in case of actual network issues, while not increasing memory pressure in a normal scenario (where the client controls the flow with ReleaseExecute)
Since Spark 3.5 released before that suite was added, making this change now is low risk change at this point before the next release, and it will have good baking-in time before next release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not be changing it in the suite, because that suite is suppose to stress test how the actual client behaves when faced with disconnects. If we changed it in the suite, that would be sweeping under the carpet that I think from the experiments performed now that it was a bit too small for retries robustness.
This is not a major issue, and this in practice only applies in situations when connect faces real intermittent connectivity issues, where before this was implemented, it would just fail.

class SparkConnectServerTest extends SharedSparkSession {
trait SparkConnectServerTest extends SharedSparkSession {
Copy link
Contributor Author

@juliuszsompolski juliuszsompolski Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having it as a class was making it execute as a suite with no tests, but still doing the beforeAll / afterAll.

@dongjoon-hyun
Copy link
Member

It seems that another suite starts to fail. Is it related?

Screenshot 2023-09-14 at 3 08 25 PM

@LuciferYang
Copy link
Contributor

Just to confirm, will the case mentioned by #42560 (comment) also be fixed in this PR?

@juliuszsompolski
Copy link
Contributor Author

@dongjoon-hyun I don't think the SparkConnectSessionHolderSuite failures are related, and I don't know what's going on there.

Streaming foreachBatch worker is starting with url sc://localhost:15002/;user_id=testUser and sessionId 9863bb98-6682-43ad-bc86-b32d8486fb47.
Traceback (most recent call last):
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/pandas/utils.py", line 27, in require_minimum_pandas_version
    import pandas
ModuleNotFoundError: No module named 'pandas'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 86, in <module>
    main(sock_file, sock_file)
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 51, in main
    spark_connect_session = SparkSession.builder.remote(connect_url).getOrCreate()
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/session.py", line 464, in getOrCreate
    from pyspark.sql.connect.session import SparkSession as RemoteSparkSession
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/connect/session.py", line 19, in <module>
    check_dependencies(__name__)
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/connect/utils.py", line 33, in check_dependencies
    require_minimum_pandas_version()
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/pandas/utils.py", line 34, in require_minimum_pandas_version
    raise ImportError(
ImportError: Pandas >= 1.0.5 must be installed; however, it was not found.
[info] - python foreachBatch process: process terminates after query is stopped *** FAILED *** (1 second, 115 milliseconds)

Streaming query listener worker is starting with url sc://localhost:15002/;user_id=testUser and sessionId ab6cfcde-a9f1-4b96-8ca3-7aab5c6ff438.
Traceback (most recent call last):
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/pandas/utils.py", line 27, in require_minimum_pandas_version
    import pandas
ModuleNotFoundError: No module named 'pandas'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/connect/streaming/worker/listener_worker.py", line 99, in <module>
    main(sock_file, sock_file)
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/connect/streaming/worker/listener_worker.py", line 59, in main
    spark_connect_session = SparkSession.builder.remote(connect_url).getOrCreate()
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/session.py", line 464, in getOrCreate
    from pyspark.sql.connect.session import SparkSession as RemoteSparkSession
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/connect/session.py", line 19, in <module>
    check_dependencies(__name__)
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/connect/utils.py", line 33, in check_dependencies
    require_minimum_pandas_version()
  File "/home/runner/work/apache-spark/apache-spark/python/pyspark/sql/pandas/utils.py", line 34, in require_minimum_pandas_version
    raise ImportError(
ImportError: Pandas >= 1.0.5 must be installed; however, it was not found.
[info] - python listener process: process terminates after listener is removed *** FAILED *** (434 milliseconds)
[info]   java.io.EOFException:

it looks to me like some (intermittent?) environment issue.

@juliuszsompolski
Copy link
Contributor Author

@LuciferYang I tried looking at #42560 (comment) but did not reproduce it yet. If you have more instances of CI runs where it failed with that stack overflow, that could be useful.
Inspecting the code, I don't see how that iterator could get looped like that...

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @juliuszsompolski and all.
Given the current situation, I believe we proceed further after merging this.

@dongjoon-hyun
Copy link
Member

Merged to master.

@LuciferYang
Copy link
Contributor

LuciferYang commented Sep 18, 2023

@LuciferYang I tried looking at #42560 (comment) but did not reproduce it yet. If you have more instances of CI runs where it failed with that stack overflow, that could be useful. Inspecting the code, I don't see how that iterator could get looped like that...

It seems that this issue is relatively easy to reproduce on Github Action. In the past three days, daily tests on Scala 2.13 have all experienced StackOverflowError

@LuciferYang
Copy link
Contributor

dev/change-scala-version.sh 2.13
build/sbt "connect/test" -Pscala-2.13

@juliuszsompolski When I run the above command during local test, it is easier to reproduce StackOverflowError.

@LuciferYang
Copy link
Contributor

@LuciferYang I tried looking at #42560 (comment) but did not reproduce it yet. If you have more instances of CI runs where it failed with that stack overflow, that could be useful. Inspecting the code, I don't see how that iterator could get looped like that...

Although it seems quite magical, after #42981 eliminated an ambiguous references in org.apache.spark.sql.connect.client.CloseableIterator, the test abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error no longer occurs java.lang.StackOverflowError in my local tests, let's monitor the GA test with Scala 2.13.

@LuciferYang
Copy link
Contributor

LuciferYang commented Sep 19, 2023

@dongjoon-hyun @juliuszsompolski I think branch-3.5 also need this pr due to #42560 has also been merged into the branch-3.5

@hvanhovell
Copy link
Contributor

Let me backport it to 3.5.

hvanhovell pushed a commit that referenced this pull request Sep 19, 2023
… increase retry buffer

### What changes were proposed in this pull request?

Deflake tests in ReattachableExecuteSuite and increase CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE.

### Why are the changes needed?

Two tests could be flaky with errors `INVALID_CURSOR.POSITION_NOT_AVAILABLE`.
This is caused when a server releases the response when it falls more than CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE behind the latest response it sent. However, because of HTTP2 flow control, the responses could still be in transit. In the test suite, we were explicitly disconnecting the iterators and later reconnect... In some cases they could not reconnect, because the response they last seen have fallen too fare behind.

This not only changes the suite, but also adjust the default config. This potentially makes the reconnecting more robust. In normal situation, it should not lead to increased memory pressure, because the clients also release the responses using ReleaseExecute as soon as they are received. Normally, buffered responses should be freed by ReleaseExecute and this retry buffer is only a fallback mechanism. Therefore, it is safe to increase the default.

In practice, this would only have effect in cases where there are actual network errors, and the increased buffer size should make the reconnects more robust in these cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

ReattachableExecuteSuite.
Did more manual experiments of how far the response sent by client can be behind the response sent by server (because of HTTP2 flow control window)

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42908 from juliuszsompolski/SPARK-44872-followup.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@hvanhovell
Copy link
Contributor

I have cherry-picked this to 3.5.

@LuciferYang
Copy link
Contributor

Thanks @hvanhovell

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants